=============================================================================¶

ANÁLISIS COMPLETO DE DATOS COVID-19 MÉXICO - SPARK + AWS + PANDAS¶

=============================================================================¶

=============================================================================¶

IMPORTACIÓN DE LIBRERÍAS¶

=============================================================================¶

In [10]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
import plotly.graph_objects as go
import warnings
warnings.filterwarnings('ignore')

from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

from pyspark.conf import SparkConf

import boto3

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import s3fs
from io import StringIO


print("✅ Librerías importadas correctamente")
✅ Librerías importadas correctamente
In [11]:
# Configuración de visualización
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")
pd.set_option('display.max_columns', None)
warnings.filterwarnings('ignore')

print("✅ Librerías importadas exitosamente - incluyendo AWS S3 y Spark")
✅ Librerías importadas exitosamente - incluyendo AWS S3 y Spark

=============================================================================¶

CELDA 2: CONFIGURACIÓN DE SPARK¶

=============================================================================¶

In [12]:
spark = (
    SparkSession.builder
    .appName("COVID19-México")
    .config("spark.jars.packages", 
            "org.apache.hadoop:hadoop-aws:3.3.2,com.amazonaws:aws-java-sdk-bundle:1.12.367")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider",
            "com.amazonaws.auth.DefaultAWSCredentialsProviderChain")
    .config("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
    .getOrCreate()
)

df = spark.read.parquet("s3a://xideralaws-curso-edgargu/covid19-data/covid19_mexico_s3fs.parquet")
df.printSchema()
df.show(3, truncate=False)
root
 |-- FECHA_SINTOMAS: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- FECHA_DEF: string (nullable = true)
 |-- SEXO: long (nullable = true)
 |-- EDAD: long (nullable = true)
 |-- TIPO_PACIENTE: long (nullable = true)
 |-- INTUBADO: long (nullable = true)
 |-- NEUMONIA: long (nullable = true)
 |-- DIABETES: long (nullable = true)
 |-- HIPERTENSION: long (nullable = true)
 |-- OBESIDAD: long (nullable = true)
 |-- ASMA: long (nullable = true)
 |-- EPOC: long (nullable = true)
 |-- ENTIDAD_RES: long (nullable = true)
 |-- CLASIFICACION_FINAL_COVID: long (nullable = true)

+--------------+-------------+----------+----+----+-------------+--------+--------+--------+------------+--------+----+----+-----------+-------------------------+
|FECHA_SINTOMAS|FECHA_INGRESO|FECHA_DEF |SEXO|EDAD|TIPO_PACIENTE|INTUBADO|NEUMONIA|DIABETES|HIPERTENSION|OBESIDAD|ASMA|EPOC|ENTIDAD_RES|CLASIFICACION_FINAL_COVID|
+--------------+-------------+----------+----+----+-------------+--------+--------+--------+------------+--------+----+----+-----------+-------------------------+
|2025-04-20    |2025-04-21   |9999-99-99|2   |8   |1            |97      |2       |2       |2           |2       |2   |2   |1          |6                        |
|2025-01-01    |2025-01-02   |9999-99-99|2   |23  |1            |97      |2       |2       |2           |2       |2   |2   |20         |6                        |
|2025-01-02    |2025-01-02   |9999-99-99|2   |18  |1            |97      |2       |2       |2           |2       |1   |2   |8          |6                        |
+--------------+-------------+----------+----+----+-------------+--------+--------+--------+------------+--------+----+----+-----------+-------------------------+
only showing top 3 rows

=============================================================================¶

CELDA 3: CONFIGURACIÓN AWS S3¶

=============================================================================¶

In [13]:
s3_client = boto3.client("s3")

bucket_name = 'xideralaws-curso-edgargu'
object_key = 'covid19-data'

print("Configuración S3 completada")
print(f"🔗 Configuración S3 completada")
print(f"   Bucket: {bucket_name}")
print(f"   Archivo: {object_key}")
Configuración S3 completada
🔗 Configuración S3 completada
   Bucket: xideralaws-curso-edgargu
   Archivo: covid19-data

=============================================================================¶

CELDA 4: CARGA DE DATOS DESDE S3¶

=============================================================================¶

In [14]:
# Cargar datos desde S3
s3_path = f's3://{bucket_name}/{object_key}/COVID19MEXICO.csv'

try:
    # Intentar cargar desde S3
    df = pd.read_csv(s3_path)
    print(f"✅ Datos cargados desde S3: {df.shape}")
except:
    # Cargar desde archivo local
    df = pd.read_csv('COVID19MEXICO.csv')
    print(f"✅ Datos cargados localmente: {df.shape}")

print(f"Dataset shape: {df.shape[0]:,} filas x {df.shape[1]} columnas")
✅ Datos cargados localmente: (104583, 42)
Dataset shape: 104,583 filas x 42 columnas

=============================================================================¶

CELDA 5: INFORMACIÓN GENERAL DEL DATASET¶

=============================================================================¶

In [15]:
print("="*80)
print("INFORMACIÓN GENERAL DEL DATASET")
print("="*80)

print(f"\nShape del dataset: {df.shape}")
print(f"Columnas: {list(df.columns)}")

print("\nInformación básica:")
print(df.info())

print("\nPrimeras 5 filas:")
print(df.head())

print("\nEstadísticas descriptivas:")
print(df.describe())
================================================================================
INFORMACIÓN GENERAL DEL DATASET
================================================================================

Shape del dataset: (104583, 42)
Columnas: ['FECHA_ACTUALIZACION', 'ID_REGISTRO', 'ORIGEN', 'SECTOR', 'ENTIDAD_UM', 'SEXO', 'ENTIDAD_NAC', 'ENTIDAD_RES', 'MUNICIPIO_RES', 'TIPO_PACIENTE', 'FECHA_INGRESO', 'FECHA_SINTOMAS', 'FECHA_DEF', 'INTUBADO', 'NEUMONIA', 'EDAD', 'NACIONALIDAD', 'EMBARAZO', 'HABLA_LENGUA_INDIG', 'INDIGENA', 'DIABETES', 'EPOC', 'ASMA', 'INMUSUPR', 'HIPERTENSION', 'OTRA_COM', 'CARDIOVASCULAR', 'OBESIDAD', 'RENAL_CRONICA', 'TABAQUISMO', 'OTRO_CASO', 'TOMA_MUESTRA_LAB', 'RESULTADO_PCR', 'RESULTADO_PCR_COINFECCION', 'TOMA_MUESTRA_ANTIGENO', 'RESULTADO_ANTIGENO', 'CLASIFICACION_FINAL_COVID', 'CLASIFICACION_FINAL_FLU', 'MIGRANTE', 'PAIS_NACIONALIDAD', 'PAIS_ORIGEN', 'UCI']

Información básica:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 104583 entries, 0 to 104582
Data columns (total 42 columns):
 #   Column                     Non-Null Count   Dtype 
---  ------                     --------------   ----- 
 0   FECHA_ACTUALIZACION        104583 non-null  object
 1   ID_REGISTRO                104583 non-null  object
 2   ORIGEN                     104583 non-null  int64 
 3   SECTOR                     104583 non-null  int64 
 4   ENTIDAD_UM                 104583 non-null  int64 
 5   SEXO                       104583 non-null  int64 
 6   ENTIDAD_NAC                104583 non-null  int64 
 7   ENTIDAD_RES                104583 non-null  int64 
 8   MUNICIPIO_RES              104583 non-null  int64 
 9   TIPO_PACIENTE              104583 non-null  int64 
 10  FECHA_INGRESO              104583 non-null  object
 11  FECHA_SINTOMAS             104583 non-null  object
 12  FECHA_DEF                  104583 non-null  object
 13  INTUBADO                   104583 non-null  int64 
 14  NEUMONIA                   104583 non-null  int64 
 15  EDAD                       104583 non-null  int64 
 16  NACIONALIDAD               104583 non-null  int64 
 17  EMBARAZO                   104583 non-null  int64 
 18  HABLA_LENGUA_INDIG         104583 non-null  int64 
 19  INDIGENA                   104583 non-null  int64 
 20  DIABETES                   104583 non-null  int64 
 21  EPOC                       104583 non-null  int64 
 22  ASMA                       104583 non-null  int64 
 23  INMUSUPR                   104583 non-null  int64 
 24  HIPERTENSION               104583 non-null  int64 
 25  OTRA_COM                   104583 non-null  int64 
 26  CARDIOVASCULAR             104583 non-null  int64 
 27  OBESIDAD                   104583 non-null  int64 
 28  RENAL_CRONICA              104583 non-null  int64 
 29  TABAQUISMO                 104583 non-null  int64 
 30  OTRO_CASO                  104583 non-null  int64 
 31  TOMA_MUESTRA_LAB           104583 non-null  int64 
 32  RESULTADO_PCR              104583 non-null  int64 
 33  RESULTADO_PCR_COINFECCION  104583 non-null  int64 
 34  TOMA_MUESTRA_ANTIGENO      104583 non-null  int64 
 35  RESULTADO_ANTIGENO         104583 non-null  int64 
 36  CLASIFICACION_FINAL_COVID  104583 non-null  int64 
 37  CLASIFICACION_FINAL_FLU    104583 non-null  int64 
 38  MIGRANTE                   104583 non-null  int64 
 39  PAIS_NACIONALIDAD          104583 non-null  object
 40  PAIS_ORIGEN                104583 non-null  object
 41  UCI                        104583 non-null  int64 
dtypes: int64(35), object(7)
memory usage: 33.5+ MB
None

Primeras 5 filas:
  FECHA_ACTUALIZACION ID_REGISTRO  ORIGEN  SECTOR  ENTIDAD_UM  SEXO  \
0          2025-08-19      167f1a       1      12           1     2   
1          2025-08-19     ga8a474       1       4          20     2   
2          2025-08-19     gb733da       1       6           8     2   
3          2025-08-19     g9ff7b3       1       4          32     1   
4          2025-08-19     g90b5c8       1       6          10     1   

   ENTIDAD_NAC  ENTIDAD_RES  MUNICIPIO_RES  TIPO_PACIENTE FECHA_INGRESO  \
0            1            1              3              1    2025-04-21   
1           20           20            413              1    2025-01-02   
2            8            8             37              1    2025-01-02   
3           32           32             17              2    2025-01-01   
4           10           10              5              2    2025-01-02   

  FECHA_SINTOMAS   FECHA_DEF  INTUBADO  NEUMONIA  EDAD  NACIONALIDAD  \
0     2025-04-20  9999-99-99        97         2     8             1   
1     2025-01-01  9999-99-99        97         2    23             1   
2     2025-01-02  9999-99-99        97         2    18             1   
3     2025-01-01  9999-99-99         2         2    24             1   
4     2025-01-02  9999-99-99         2         2    47             1   

   EMBARAZO  HABLA_LENGUA_INDIG  INDIGENA  DIABETES  EPOC  ASMA  INMUSUPR  \
0        97                   2         2         2     2     2         2   
1        97                   2         2         2     2     2         2   
2        97                   2         2         2     2     1         2   
3         2                   2         2         2     2     1         2   
4         2                   2         2         2     2     2         2   

   HIPERTENSION  OTRA_COM  CARDIOVASCULAR  OBESIDAD  RENAL_CRONICA  \
0             2         2               2         2              2   
1             2         2               2         2              2   
2             2         2               2         2              2   
3             2         2               2         2              2   
4             2         2               2         2              2   

   TABAQUISMO  OTRO_CASO  TOMA_MUESTRA_LAB  RESULTADO_PCR  \
0           2          2                 2            997   
1           2          2                 2            997   
2           2          2                 2            997   
3           2          2                 1              5   
4           2          2                 1              5   

   RESULTADO_PCR_COINFECCION  TOMA_MUESTRA_ANTIGENO  RESULTADO_ANTIGENO  \
0                        997                      2                  97   
1                        997                      2                  97   
2                        997                      2                  97   
3                          5                      2                  97   
4                          5                      2                  97   

   CLASIFICACION_FINAL_COVID  CLASIFICACION_FINAL_FLU  MIGRANTE  \
0                          6                        6        99   
1                          6                        6        99   
2                          6                        6        99   
3                          7                        7        99   
4                          7                        7        99   

  PAIS_NACIONALIDAD PAIS_ORIGEN  UCI  
0            México          97   97  
1            México          97   97  
2            México          97   97  
3            México          97    2  
4            México          97    2  

Estadísticas descriptivas:
         ORIGEN         SECTOR     ENTIDAD_UM           SEXO    ENTIDAD_NAC  \
count  104583.0  104583.000000  104583.000000  104583.000000  104583.000000   
mean        1.0       8.757054      15.935735       1.443323      16.654887   
std         0.0       4.636364       8.481913       0.496780      10.061865   
min         1.0       2.000000       1.000000       1.000000       1.000000   
25%         1.0       4.000000       9.000000       1.000000       9.000000   
50%         1.0       6.000000      15.000000       1.000000      15.000000   
75%         1.0      12.000000      21.000000       2.000000      22.000000   
max         1.0      15.000000      32.000000       2.000000      99.000000   

         ENTIDAD_RES  MUNICIPIO_RES  TIPO_PACIENTE       INTUBADO  \
count  104583.000000  104583.000000  104583.000000  104583.000000   
mean       16.182114      44.680158       1.460926      53.199268   
std         8.397489      58.969891       0.498473      47.378509   
min         1.000000       1.000000       1.000000       1.000000   
25%         9.000000       8.000000       1.000000       2.000000   
50%        15.000000      20.000000       1.000000      97.000000   
75%        22.000000      57.000000       2.000000      97.000000   
max        32.000000     999.000000       2.000000      99.000000   

            NEUMONIA           EDAD   NACIONALIDAD       EMBARAZO  \
count  104583.000000  104583.000000  104583.000000  104583.000000   
mean        1.762026      36.606982       1.004542      44.276546   
std         0.425845      25.900712       0.067240      47.226776   
min         1.000000       0.000000       1.000000       1.000000   
25%         2.000000      13.000000       1.000000       2.000000   
50%         2.000000      34.000000       1.000000       2.000000   
75%         2.000000      56.000000       1.000000      97.000000   
max         2.000000     111.000000       2.000000      98.000000   

       HABLA_LENGUA_INDIG       INDIGENA       DIABETES           EPOC  \
count       104583.000000  104583.000000  104583.000000  104583.000000   
mean             5.100800       4.734144       1.990237       2.095388   
std             17.097612      16.105444       3.468442       3.528481   
min              1.000000       1.000000       1.000000       1.000000   
25%              2.000000       2.000000       2.000000       2.000000   
50%              2.000000       2.000000       2.000000       2.000000   
75%              2.000000       2.000000       2.000000       2.000000   
max             99.000000      99.000000      98.000000      98.000000   

                ASMA       INMUSUPR   HIPERTENSION       OTRA_COM  \
count  104583.000000  104583.000000  104583.000000  104583.000000   
mean        2.092864       2.055583       1.923534       2.712793   
std         3.566553       2.913621       2.889871       8.546264   
min         1.000000       1.000000       1.000000       1.000000   
25%         2.000000       2.000000       2.000000       2.000000   
50%         2.000000       2.000000       2.000000       2.000000   
75%         2.000000       2.000000       2.000000       2.000000   
max        98.000000      98.000000      98.000000      98.000000   

       CARDIOVASCULAR       OBESIDAD  RENAL_CRONICA     TABAQUISMO  \
count   104583.000000  104583.000000  104583.000000  104583.000000   
mean         2.060689       2.008309       2.050821       2.064590   
std          3.017742       2.716976       2.883812       3.258229   
min          1.000000       1.000000       1.000000       1.000000   
25%          2.000000       2.000000       2.000000       2.000000   
50%          2.000000       2.000000       2.000000       2.000000   
75%          2.000000       2.000000       2.000000       2.000000   
max         98.000000      98.000000      98.000000      98.000000   

          OTRO_CASO  TOMA_MUESTRA_LAB  RESULTADO_PCR  \
count  104583.00000     104583.000000  104583.000000   
mean        4.37552          1.334815     353.393037   
std        15.58371          0.471928     471.209645   
min         1.00000          1.000000       1.000000   
25%         2.00000          1.000000       5.000000   
50%         2.00000          1.000000       5.000000   
75%         2.00000          2.000000     997.000000   
max        99.00000          2.000000     999.000000   

       RESULTADO_PCR_COINFECCION  TOMA_MUESTRA_ANTIGENO  RESULTADO_ANTIGENO  \
count              104583.000000               104583.0            104583.0   
mean                  547.494277                    2.0                97.0   
std                   493.693981                    0.0                 0.0   
min                     1.000000                    2.0                97.0   
25%                     5.000000                    2.0                97.0   
50%                   997.000000                    2.0                97.0   
75%                   997.000000                    2.0                97.0   
max                   999.000000                    2.0                97.0   

       CLASIFICACION_FINAL_COVID  CLASIFICACION_FINAL_FLU       MIGRANTE  \
count              104583.000000            104583.000000  104583.000000   
mean                    6.381783                 6.187067      98.609439   
std                     0.982804                 1.210828       6.150734   
min                     3.000000                 3.000000       1.000000   
25%                     6.000000                 6.000000      99.000000   
50%                     7.000000                 7.000000      99.000000   
75%                     7.000000                 7.000000      99.000000   
max                     7.000000                 7.000000      99.000000   

                 UCI  
count  104583.000000  
mean       53.206707  
std        47.370390  
min         1.000000  
25%         2.000000  
50%        97.000000  
75%        97.000000  
max        99.000000  

=============================================================================¶

CELDA 6: CONVERSIÓN A SPARK DATAFRAME¶

=============================================================================¶

In [16]:
# Crear Spark DataFrame
spark_df = spark.createDataFrame(df)

print("DataFrame de Spark creado")
print(f"Número de particiones: {spark_df.rdd.getNumPartitions()}")

# Mostrar schema
spark_df.printSchema()

# Cache del DataFrame
spark_df.cache()
print("DataFrame cacheado para mejor rendimiento")
DataFrame de Spark creado
Número de particiones: 2
root
 |-- FECHA_ACTUALIZACION: string (nullable = true)
 |-- ID_REGISTRO: string (nullable = true)
 |-- ORIGEN: long (nullable = true)
 |-- SECTOR: long (nullable = true)
 |-- ENTIDAD_UM: long (nullable = true)
 |-- SEXO: long (nullable = true)
 |-- ENTIDAD_NAC: long (nullable = true)
 |-- ENTIDAD_RES: long (nullable = true)
 |-- MUNICIPIO_RES: long (nullable = true)
 |-- TIPO_PACIENTE: long (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- FECHA_SINTOMAS: string (nullable = true)
 |-- FECHA_DEF: string (nullable = true)
 |-- INTUBADO: long (nullable = true)
 |-- NEUMONIA: long (nullable = true)
 |-- EDAD: long (nullable = true)
 |-- NACIONALIDAD: long (nullable = true)
 |-- EMBARAZO: long (nullable = true)
 |-- HABLA_LENGUA_INDIG: long (nullable = true)
 |-- INDIGENA: long (nullable = true)
 |-- DIABETES: long (nullable = true)
 |-- EPOC: long (nullable = true)
 |-- ASMA: long (nullable = true)
 |-- INMUSUPR: long (nullable = true)
 |-- HIPERTENSION: long (nullable = true)
 |-- OTRA_COM: long (nullable = true)
 |-- CARDIOVASCULAR: long (nullable = true)
 |-- OBESIDAD: long (nullable = true)
 |-- RENAL_CRONICA: long (nullable = true)
 |-- TABAQUISMO: long (nullable = true)
 |-- OTRO_CASO: long (nullable = true)
 |-- TOMA_MUESTRA_LAB: long (nullable = true)
 |-- RESULTADO_PCR: long (nullable = true)
 |-- RESULTADO_PCR_COINFECCION: long (nullable = true)
 |-- TOMA_MUESTRA_ANTIGENO: long (nullable = true)
 |-- RESULTADO_ANTIGENO: long (nullable = true)
 |-- CLASIFICACION_FINAL_COVID: long (nullable = true)
 |-- CLASIFICACION_FINAL_FLU: long (nullable = true)
 |-- MIGRANTE: long (nullable = true)
 |-- PAIS_NACIONALIDAD: string (nullable = true)
 |-- PAIS_ORIGEN: string (nullable = true)
 |-- UCI: long (nullable = true)

DataFrame cacheado para mejor rendimiento
25/08/29 05:24:35 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

=============================================================================¶

CELDA 7: ANÁLISIS CON SPARK¶

=============================================================================¶

In [17]:
print("="*60)
print("ANÁLISIS CON SPARK")
print("="*60)

# Contar total de registros
total_registros = spark_df.count()
print(f"Total de registros: {total_registros:,}")

# Análisis por sexo
print("\nDistribución por SEXO:")
spark_df.groupBy("SEXO").count().orderBy(desc("count")).show()

# Análisis por tipo de paciente
print("\nDistribución por TIPO_PACIENTE:")
spark_df.groupBy("TIPO_PACIENTE").count().orderBy(desc("count")).show()

# Estadísticas de edad
print("\nEstadísticas de EDAD:")
spark_df.select("EDAD").describe().show()
============================================================
ANÁLISIS CON SPARK
============================================================
25/08/29 05:24:35 WARN TaskSetManager: Stage 6 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
25/08/29 05:24:40 WARN TaskSetManager: Stage 7 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
                                                                                
Total de registros: 104,583

Distribución por SEXO:
25/08/29 05:24:41 WARN TaskSetManager: Stage 10 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+----+-----+
|SEXO|count|
+----+-----+
|   1|58219|
|   2|46364|
+----+-----+


Distribución por TIPO_PACIENTE:
25/08/29 05:24:42 WARN TaskSetManager: Stage 13 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+-------------+-----+
|TIPO_PACIENTE|count|
+-------------+-----+
|            1|56378|
|            2|48205|
+-------------+-----+


Estadísticas de EDAD:
25/08/29 05:24:42 WARN TaskSetManager: Stage 16 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
+-------+------------------+
|summary|              EDAD|
+-------+------------------+
|  count|            104583|
|   mean| 36.60698201428531|
| stddev|25.900712029021644|
|    min|                 0|
|    max|               111|
+-------+------------------+

=============================================================================¶

CELDA 8: LIMPIEZA Y TRANSFORMACIÓN DE DATOS¶

=============================================================================¶

In [18]:
print("="*60)
print("LIMPIEZA Y TRANSFORMACIÓN DE DATOS")
print("="*60)

# Verificar valores nulos
print("Valores nulos por columna:")
from pyspark.sql.functions import col, isnan, when, count

null_counts = spark_df.select([count(when(col(c).isNull(), c)).alias(c) for c in spark_df.columns])
null_counts.show()

# Limpiar datos
df_clean = spark_df.filter(
    (col("EDAD").isNotNull()) &
    (col("SEXO").isin([1, 2])) &
    (col("TIPO_PACIENTE").isin([1, 2]))
)

print(f"Registros después de limpieza: {df_clean.count():,}")

# Crear categorías de edad
df_clean = df_clean.withColumn("GRUPO_EDAD",
    when(col("EDAD") < 18, "0-17")
    .when(col("EDAD") < 30, "18-29")
    .when(col("EDAD") < 45, "30-44")
    .when(col("EDAD") < 60, "45-59")
    .when(col("EDAD") < 75, "60-74")
    .otherwise("75+")
)

print("Categorías de edad creadas")
============================================================
LIMPIEZA Y TRANSFORMACIÓN DE DATOS
============================================================
Valores nulos por columna:
25/08/29 05:24:43 WARN TaskSetManager: Stage 19 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
                                                                                
+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+---------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+-------------------------+---------------------+------------------+-------------------------+-----------------------+--------+-----------------+-----------+---+
|FECHA_ACTUALIZACION|ID_REGISTRO|ORIGEN|SECTOR|ENTIDAD_UM|SEXO|ENTIDAD_NAC|ENTIDAD_RES|MUNICIPIO_RES|TIPO_PACIENTE|FECHA_INGRESO|FECHA_SINTOMAS|FECHA_DEF|INTUBADO|NEUMONIA|EDAD|NACIONALIDAD|EMBARAZO|HABLA_LENGUA_INDIG|INDIGENA|DIABETES|EPOC|ASMA|INMUSUPR|HIPERTENSION|OTRA_COM|CARDIOVASCULAR|OBESIDAD|RENAL_CRONICA|TABAQUISMO|OTRO_CASO|TOMA_MUESTRA_LAB|RESULTADO_PCR|RESULTADO_PCR_COINFECCION|TOMA_MUESTRA_ANTIGENO|RESULTADO_ANTIGENO|CLASIFICACION_FINAL_COVID|CLASIFICACION_FINAL_FLU|MIGRANTE|PAIS_NACIONALIDAD|PAIS_ORIGEN|UCI|
+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+---------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+-------------------------+---------------------+------------------+-------------------------+-----------------------+--------+-----------------+-----------+---+
|                  0|          0|     0|     0|         0|   0|          0|          0|            0|            0|            0|             0|        0|       0|       0|   0|           0|       0|                 0|       0|       0|   0|   0|       0|           0|       0|             0|       0|            0|         0|        0|               0|            0|                        0|                    0|                 0|                        0|                      0|       0|                0|          0|  0|
+-------------------+-----------+------+------+----------+----+-----------+-----------+-------------+-------------+-------------+--------------+---------+--------+--------+----+------------+--------+------------------+--------+--------+----+----+--------+------------+--------+--------------+--------+-------------+----------+---------+----------------+-------------+-------------------------+---------------------+------------------+-------------------------+-----------------------+--------+-----------------+-----------+---+

25/08/29 05:24:44 WARN TaskSetManager: Stage 22 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Registros después de limpieza: 104,583
Categorías de edad creadas

=============================================================================¶

CELDA 9: CONFIGURACIÓN DE ESTILO PARA VISUALIZACIONES¶

=============================================================================¶

In [19]:
# Configuración de matplotlib y seaborn
plt.style.use('default')
sns.set_style("whitegrid")
sns.set_palette("husl")

# Configuración de tamaños
plt.rcParams['figure.figsize'] = (12, 8)
plt.rcParams['axes.titlesize'] = 16
plt.rcParams['axes.labelsize'] = 12

print("Configuración de visualizaciones aplicada")
Configuración de visualizaciones aplicada

=============================================================================¶

CELDA 10: Visualización 1 - Distribución de Casos por Edad¶

=============================================================================¶

In [20]:
# Convertir a pandas para visualización
edad_data = df_clean.groupBy("GRUPO_EDAD").count().toPandas().sort_values("GRUPO_EDAD")

plt.figure(figsize=(12, 6))
bars = plt.bar(edad_data['GRUPO_EDAD'], edad_data['count'], color='skyblue', alpha=0.7)
plt.title('Distribución de Casos por Grupo de Edad')
plt.xlabel('Grupo de Edad')
plt.ylabel('Número de Casos')

# Añadir valores encima de las barras
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{int(height):,}', ha='center', va='bottom')

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print(f"Total de grupos de edad analizados: {len(edad_data)}")
25/08/29 05:24:58 WARN TaskSetManager: Stage 25 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Total de grupos de edad analizados: 6

=============================================================================¶

CELDA 11: Visualización 2 - Top 10 Estados más Afectados¶

=============================================================================¶

In [21]:
# Top 10 entidades con más casos
top_entidades = df_clean.groupBy("ENTIDAD_RES").count().orderBy(desc("count")).limit(10).toPandas()

plt.figure(figsize=(12, 8))
bars = plt.barh(top_entidades['ENTIDAD_RES'], top_entidades['count'], color='lightcoral', alpha=0.7)
plt.title('Top 10 Entidades con Más Casos')
plt.xlabel('Número de Casos')
plt.ylabel('Código de Entidad')

# Añadir valores
for i, bar in enumerate(bars):
    width = bar.get_width()
    plt.text(width, bar.get_y() + bar.get_height()/2.,
             f'{int(width):,}', ha='left', va='center')

plt.tight_layout()
plt.show()

print("Top 10 entidades mostradas")
25/08/29 05:25:03 WARN TaskSetManager: Stage 28 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Top 10 entidades mostradas

=============================================================================¶

CELDA 12: Visualización 3 - Distribución por Tipo de Paciente¶

=============================================================================¶

In [22]:
# Distribución por tipo de paciente
tipo_data = df_clean.groupBy("TIPO_PACIENTE").count().toPandas()

# Mapear valores
tipo_data['TIPO_DESC'] = tipo_data['TIPO_PACIENTE'].map({1: 'Ambulatorio', 2: 'Hospitalizado'})

plt.figure(figsize=(10, 6))
plt.pie(tipo_data['count'], labels=tipo_data['TIPO_DESC'], autopct='%1.1f%%',
        colors=['lightgreen', 'salmon'])
plt.title('Distribución por Tipo de Paciente')
plt.axis('equal')
plt.show()

print("Distribución por tipo de paciente mostrada")
25/08/29 05:25:07 WARN TaskSetManager: Stage 31 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Distribución por tipo de paciente mostrada

=============================================================================¶

CELDA 13: Visualización 4 - Casos por Sexo¶

=============================================================================¶

In [23]:
# Distribución por sexo
sexo_data = df_clean.groupBy("SEXO").count().toPandas()
sexo_data['SEXO_DESC'] = sexo_data['SEXO'].map({1: 'Mujer', 2: 'Hombre'})

plt.figure(figsize=(8, 6))
plt.pie(sexo_data['count'], labels=sexo_data['SEXO_DESC'], autopct='%1.1f%%',
        colors=['pink', 'lightblue'])
plt.title('Distribución de Casos por Sexo')
plt.axis('equal')
plt.show()

print("Distribución por sexo mostrada")
25/08/29 05:25:11 WARN TaskSetManager: Stage 34 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Distribución por sexo mostrada

=============================================================================¶

CELDA 14: Visualización 5 - Distribución de Edad¶

=============================================================================¶

In [24]:
# Histograma de edades
edad_pandas = df_clean.select("EDAD").toPandas()

plt.figure(figsize=(12, 6))
plt.hist(edad_pandas['EDAD'], bins=50, color='lightblue', alpha=0.7, edgecolor='black')
plt.title('Distribución de Edad de los Pacientes')
plt.xlabel('Edad')
plt.ylabel('Frecuencia')
plt.grid(axis='y', alpha=0.3)
plt.show()

print(f"Estadísticas de edad:")
print(f"Edad promedio: {edad_pandas['EDAD'].mean():.2f}")
print(f"Mediana: {edad_pandas['EDAD'].median():.2f}")
25/08/29 05:25:13 WARN TaskSetManager: Stage 37 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Estadísticas de edad:
Edad promedio: 36.61
Mediana: 34.00

=============================================================================¶

CELDA 15: Visualización 6 - Casos por Sector¶

=============================================================================¶

In [25]:
# Distribución por sector
sector_data = df_clean.groupBy("SECTOR").count().orderBy(desc("count")).limit(10).toPandas()

plt.figure(figsize=(12, 6))
bars = plt.bar(sector_data['SECTOR'].astype(str), sector_data['count'],
               color='gold', alpha=0.7)
plt.title('Casos por Sector (Top 10)')
plt.xlabel('Sector')
plt.ylabel('Número de Casos')

# Añadir valores
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{int(height):,}', ha='center', va='bottom')

plt.xticks(rotation=45)
plt.tight_layout()
plt.show()

print("Distribución por sector mostrada")
25/08/29 05:25:17 WARN TaskSetManager: Stage 38 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Distribución por sector mostrada

=============================================================================¶

CELDA 16: Visualización 7 - Análisis Temporal¶

=============================================================================¶

In [26]:
# Análisis temporal por fecha de síntomas
temporal_data = df_clean.groupBy("FECHA_SINTOMAS").count().orderBy("FECHA_SINTOMAS").toPandas()

# Tomar muestra para visualización
if len(temporal_data) > 100:
    temporal_sample = temporal_data.sample(n=100).sort_values('FECHA_SINTOMAS')
else:
    temporal_sample = temporal_data

plt.figure(figsize=(14, 6))
plt.plot(range(len(temporal_sample)), temporal_sample['count'],
         marker='o', linewidth=2, markersize=4, color='red', alpha=0.7)
plt.title('Evolución Temporal de Casos')
plt.xlabel('Tiempo (Muestra)')
plt.ylabel('Número de Casos')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()

print(f"Análisis temporal con {len(temporal_sample)} puntos de datos")
25/08/29 05:25:22 WARN TaskSetManager: Stage 41 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Análisis temporal con 100 puntos de datos

=============================================================================¶

CELDA 17: Visualización 8 - Mapa de Calor por Edad y Sexo¶

=============================================================================¶

In [27]:
# Crear tabla cruzada edad-sexo
edad_sexo = df_clean.groupBy("GRUPO_EDAD", "SEXO").count().toPandas()
pivot_table = edad_sexo.pivot(index='GRUPO_EDAD', columns='SEXO', values='count').fillna(0)

plt.figure(figsize=(10, 6))
sns.heatmap(pivot_table, annot=True, fmt='.0f', cmap='YlOrRd',
            cbar_kws={'label': 'Número de Casos'})
plt.title('Mapa de Calor: Casos por Edad y Sexo')
plt.ylabel('Grupo de Edad')
plt.xlabel('Sexo (1=Mujer, 2=Hombre)')
plt.tight_layout()
plt.show()

print("Mapa de calor generado")
25/08/29 05:25:27 WARN TaskSetManager: Stage 49 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Mapa de calor generado

=============================================================================¶

CELDA 18: Visualización 9 - Casos por Origen¶

=============================================================================¶

In [28]:
# Distribución por origen
origen_data = df_clean.groupBy("ORIGEN").count().orderBy(desc("count")).toPandas()

plt.figure(figsize=(10, 6))
bars = plt.bar(origen_data['ORIGEN'].astype(str), origen_data['count'],
               color='purple', alpha=0.7)
plt.title('Distribución de Casos por Origen')
plt.xlabel('Origen')
plt.ylabel('Número de Casos')

# Añadir valores
for bar in bars:
    height = bar.get_height()
    plt.text(bar.get_x() + bar.get_width()/2., height,
             f'{int(height):,}', ha='center', va='bottom')

plt.tight_layout()
plt.show()

print("Distribución por origen mostrada")
25/08/29 05:25:31 WARN TaskSetManager: Stage 52 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Distribución por origen mostrada

=============================================================================¶

CELDA 19: Análisis Estadístico Avanzado¶

=============================================================================¶

In [29]:
print("="*60)
print("ANÁLISIS ESTADÍSTICO AVANZADO")
print("="*60)

# Convertir datos necesarios a pandas
analisis_data = df_clean.select("EDAD", "SEXO", "TIPO_PACIENTE", "ORIGEN").toPandas()

# Correlaciones
correlacion = analisis_data.corr()
print("Matriz de correlación:")
print(correlacion)

# Estadísticas por grupo
print("\nEstadísticas por sexo:")
print(analisis_data.groupby('SEXO')['EDAD'].describe())

print("\nEstadísticas por tipo de paciente:")
print(analisis_data.groupby('TIPO_PACIENTE')['EDAD'].describe())
============================================================
ANÁLISIS ESTADÍSTICO AVANZADO
============================================================
25/08/29 05:25:37 WARN TaskSetManager: Stage 60 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
Matriz de correlación:
                   EDAD      SEXO  TIPO_PACIENTE  ORIGEN
EDAD           1.000000 -0.073727       0.063046     NaN
SEXO          -0.073727  1.000000       0.131349     NaN
TIPO_PACIENTE  0.063046  0.131349       1.000000     NaN
ORIGEN              NaN       NaN            NaN     NaN

Estadísticas por sexo:
        count       mean        std  min   25%   50%   75%    max
SEXO                                                             
1     58219.0  38.311084  24.815771  0.0  20.0  37.0  57.0  110.0
2     46364.0  34.467151  27.050506  0.0   8.0  31.0  56.0  111.0

Estadísticas por tipo de paciente:
                 count       mean        std  min   25%   50%   75%    max
TIPO_PACIENTE                                                             
1              56378.0  35.097041  20.093828  0.0  22.0  34.0  49.0  106.0
2              48205.0  38.372928  31.264116  0.0   5.0  38.0  67.0  111.0

=============================================================================¶

CELDA 20: Machine Learning - Clustering¶

=============================================================================¶

In [30]:
# Preparar datos para clustering
ml_data = df_clean.select("EDAD", "SEXO", "TIPO_PACIENTE", "ORIGEN").toPandas().dropna()

# Normalizar datos
scaler = StandardScaler()
data_scaled = scaler.fit_transform(ml_data)

# Aplicar K-means
kmeans = KMeans(n_clusters=3, random_state=42)
clusters = kmeans.fit_predict(data_scaled)

# Visualizar clusters
plt.figure(figsize=(10, 6))
plt.scatter(ml_data['EDAD'], ml_data['SEXO'], c=clusters, cmap='viridis', alpha=0.6)
plt.title('Clustering de Pacientes')
plt.xlabel('Edad')
plt.ylabel('Sexo')
plt.colorbar()
plt.show()

print(f"Clustering completado con {len(set(clusters))} grupos")
25/08/29 05:25:43 WARN TaskSetManager: Stage 61 contains a task of very large size (4990 KiB). The maximum recommended task size is 1000 KiB.
No description has been provided for this image
Clustering completado con 3 grupos

=============================================================================¶

CELDA 21: PCA - Análisis de Componentes Principales¶

=============================================================================¶

In [31]:
# PCA
pca = PCA(n_components=2)
pca_result = pca.fit_transform(data_scaled)

plt.figure(figsize=(10, 6))
plt.scatter(pca_result[:, 0], pca_result[:, 1], alpha=0.6, c=ml_data['EDAD'], cmap='viridis')
plt.title('Análisis de Componentes Principales')
plt.xlabel(f'PC1 ({pca.explained_variance_ratio_[0]:.2%} varianza)')
plt.ylabel(f'PC2 ({pca.explained_variance_ratio_[1]:.2%} varianza)')
plt.colorbar(label='Edad')
plt.show()

print(f"Varianza explicada: PC1={pca.explained_variance_ratio_[0]:.2%}, PC2={pca.explained_variance_ratio_[1]:.2%}")
No description has been provided for this image
Varianza explicada: PC1=37.73%, PC2=35.02%

=============================================================================¶

CELDA 23: Conversión a Formato Parquet¶

=============================================================================¶

In [32]:
print("\n=== 🔧 PREPARACIÓN PARA SPARK JOBS  ===")

# Seleccionar solo columnas esenciales para análisis
columnas_esenciales = [
    'FECHA_SINTOMAS', 'FECHA_INGRESO', 'FECHA_DEF',
    'SEXO', 'EDAD', 'GRUPO_EDAD', 'TIPO_PACIENTE',
    'INTUBADO', 'NEUMONIA', 'DIABETES', 'HIPERTENSION',
    'OBESIDAD', 'ASMA', 'EPOC', 'DEFUNCION',
    'ENTIDAD_RES', 'CLASIFICACION_FINAL_COVID'
]

# Filtrar columnas que existen en el dataset
columnas_disponibles = [col for col in columnas_esenciales if col in df.columns]
df_optimized = df[columnas_disponibles].copy()

print(f"📊 Dataset optimizado: {len(columnas_disponibles)} columnas seleccionadas")
print("Columnas incluidas en el análisis:")
for i, col in enumerate(columnas_disponibles, 1):
    print(f"   {i}. {col}")

# =============================================================================
#  Manejo apropiado de valores faltantes para Spark
# =============================================================================

print("\n🔧 Aplicando correcciones para compatibilidad con Spark...")

df_for_spark = df_optimized.copy()

for col in df_for_spark.columns:
    if df_for_spark[col].dtype == 'object' or df_for_spark[col].dtype.name == 'category':
        df_for_spark[col] = df_for_spark[col].fillna('Unknown').astype(str)
    elif df_for_spark[col].dtype in ['datetime64[ns]', 'datetime64[ns, UTC]']:
        df_for_spark[col] = df_for_spark[col].astype(str)
    elif df_for_spark[col].dtype == 'bool':
        df_for_spark[col] = df_for_spark[col].astype(str)
    else:
        df_for_spark[col] = df_for_spark[col].fillna(-999)

print("✅ Correcciones aplicadas exitosamente")

# Convertir a Spark DataFrame 
try:
    df_spark_optimized = spark.createDataFrame(df_for_spark)
    print("✅ DataFrame convertido a Spark exitosamente")

    # Mostrar información del DataFrame de Spark
    print(f"\n📈 Información Spark DataFrame:")
    print(f"   Número de particiones: {df_spark_optimized.rdd.getNumPartitions()}")
    print(f"   Número de registros: {df_spark_optimized.count():,}")

    # Mostrar esquema
    print(f"\n📋 Esquema del DataFrame Spark:")
    df_spark_optimized.printSchema()

except Exception as e:
    print(f"❌ Error al convertir a Spark DataFrame: {e}")
    print("💡 Usando DataFrame de pandas como alternativa")
    df_spark_optimized = None
=== 🔧 PREPARACIÓN PARA SPARK JOBS  ===
📊 Dataset optimizado: 15 columnas seleccionadas
Columnas incluidas en el análisis:
   1. FECHA_SINTOMAS
   2. FECHA_INGRESO
   3. FECHA_DEF
   4. SEXO
   5. EDAD
   6. TIPO_PACIENTE
   7. INTUBADO
   8. NEUMONIA
   9. DIABETES
   10. HIPERTENSION
   11. OBESIDAD
   12. ASMA
   13. EPOC
   14. ENTIDAD_RES
   15. CLASIFICACION_FINAL_COVID

🔧 Aplicando correcciones para compatibilidad con Spark...
✅ Correcciones aplicadas exitosamente
✅ DataFrame convertido a Spark exitosamente

📈 Información Spark DataFrame:
   Número de particiones: 2
25/08/29 05:26:04 WARN TaskSetManager: Stage 62 contains a task of very large size (1765 KiB). The maximum recommended task size is 1000 KiB.
[Stage 62:>                                                         (0 + 2) / 2]
   Número de registros: 104,583

📋 Esquema del DataFrame Spark:
root
 |-- FECHA_SINTOMAS: string (nullable = true)
 |-- FECHA_INGRESO: string (nullable = true)
 |-- FECHA_DEF: string (nullable = true)
 |-- SEXO: long (nullable = true)
 |-- EDAD: long (nullable = true)
 |-- TIPO_PACIENTE: long (nullable = true)
 |-- INTUBADO: long (nullable = true)
 |-- NEUMONIA: long (nullable = true)
 |-- DIABETES: long (nullable = true)
 |-- HIPERTENSION: long (nullable = true)
 |-- OBESIDAD: long (nullable = true)
 |-- ASMA: long (nullable = true)
 |-- EPOC: long (nullable = true)
 |-- ENTIDAD_RES: long (nullable = true)
 |-- CLASIFICACION_FINAL_COVID: long (nullable = true)

                                                                                

=============================================================================¶

CELDA 24: Subir Resultado a S3¶

=============================================================================¶

In [33]:
print("\n=== 💾 EXPORTACIÓN A PARQUET S3 ===")

# Configuración de S3
S3_BUCKET = "xideralaws-curso-edgargu"
S3_PREFIX = "covid19-data"  # Carpeta dentro del bucket (opcional)

try:
    import boto3
    from botocore.exceptions import NoCredentialsError, ClientError
    import io
    
   
    
    # Método 1: Exportar con pandas a S3
    print(f"\n📤 Método 1: Pandas → S3")
    
    # Convertir DataFrame a Parquet en memoria
    parquet_buffer = io.BytesIO()
    df_for_spark.to_parquet(parquet_buffer, engine='pyarrow', compression='snappy', index=False)
    parquet_buffer.seek(0)
    
    # Subir a S3
    s3_key_pandas = f"{S3_PREFIX}/covid19_mexico_pandas.parquet"
    s3_client.upload_fileobj(parquet_buffer, S3_BUCKET, s3_key_pandas)
    
    # Obtener información del objeto
    response = s3_client.head_object(Bucket=S3_BUCKET, Key=s3_key_pandas)
    size_mb = response['ContentLength'] / (1024*1024)
    
    print(f"✅ Dataset exportado con pandas a S3")
    print(f"📁 Bucket: s3://{S3_BUCKET}/{s3_key_pandas}")
    print(f"📦 Tamaño archivo: {size_mb:.2f} MB")
    print(f"📅 Última modificación: {response['LastModified']}")
    
    # Método 2: Usar Spark para exportar directamente a S3
    if 'df_spark_optimized' in locals() and df_spark_optimized is not None:
        try:
            print(f"\n📤 Método 2: Spark → S3")
            
            # Configurar Spark para S3 (si no está configurado)
            spark.conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
            spark.conf.set("spark.hadoop.fs.s3a.fast.upload", "true")
            
            # Ruta S3 para Spark (usa s3a://)
            s3_path_spark = f"s3a://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_spark.parquet"
            
            # Exportar con Spark
            df_spark_optimized.coalesce(1).write.mode("overwrite").parquet(s3_path_spark)
            
            print(f"✅ Dataset también exportado con Spark a S3")
            print(f"📁 Ubicación Spark: s3://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_spark.parquet")
            
        except Exception as spark_error:
            print(f"⚠️ Export Spark a S3 falló: {spark_error}")
            print("💡 Verifica configuración AWS en Spark (credenciales, jars S3)")
    
    # Método 3: Alternativa usando s3fs (más simple)
    print(f"\n📤 Método 3: Usando s3fs")
    try:
        import s3fs
        
        # Crear filesystem S3
        fs = s3fs.S3FileSystem()
        
        # Ruta completa en S3
        s3_path_s3fs = f"s3://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_s3fs.parquet"
        
        # Exportar usando s3fs
        with fs.open(s3_path_s3fs, 'wb') as f:
            df_for_spark.to_parquet(f, engine='pyarrow', compression='snappy', index=False)
        
        print(f"✅ Dataset exportado usando s3fs")
        print(f"📁 Ubicación: {s3_path_s3fs}")
        
    except ImportError:
        print("⚠️ s3fs no disponible. Instalar con: pip install s3fs")
    except Exception as s3fs_error:
        print(f"⚠️ Export s3fs falló: {s3fs_error}")
    
    # Listar archivos en el bucket para verificar
    print(f"\n📋 Archivos en s3://{S3_BUCKET}/{S3_PREFIX}/:")
    try:
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET, Prefix=S3_PREFIX)
        if 'Contents' in response:
            for obj in response['Contents']:
                size_mb = obj['Size'] / (1024*1024)
                print(f"   📄 {obj['Key']} ({size_mb:.2f} MB)")
        else:
            print("   📂 Carpeta vacía o no existe")
    except Exception as list_error:
        print(f"   ⚠️ Error listando archivos: {list_error}")
    
    print(f"\n🎯 URLs de acceso:")
    print(f"   Pandas: s3://{S3_BUCKET}/{s3_key_pandas}")
    if 'df_spark_optimized' in locals() and df_spark_optimized is not None:
        print(f"   Spark:  s3://{S3_BUCKET}/{S3_PREFIX}/covid19_mexico_spark.parquet")
    
except ImportError as import_error:
    print(f"❌ Librerías faltantes: {import_error}")
    print("💡 Instalar con: pip install boto3 pyarrow s3fs")
    
except NoCredentialsError:
    print("❌ Error: Credenciales AWS no encontradas")
    print("💡 Configurar con:")
    print("   - AWS CLI: aws configure")
    print("   - Variables de entorno: AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY")
    print("   - IAM Role (si estás en EC2)")
    
except ClientError as client_error:
    error_code = client_error.response['Error']['Code']
    if error_code == 'NoSuchBucket':
        print(f"❌ Error: Bucket '{S3_BUCKET}' no existe")
        print("💡 Crear bucket primero o verificar nombre")
    elif error_code == 'AccessDenied':
        print(f"❌ Error: Sin permisos para acceder a '{S3_BUCKET}'")
        print("💡 Verificar políticas IAM para S3")
    else:
        print(f"❌ Error AWS: {client_error}")
        
except Exception as general_error:
    print(f"❌ Error general al exportar a S3: {general_error}")
    print("💡 Verificar:")
    print("   - Credenciales AWS configuradas")
    print("   - Permisos del bucket")
    print("   - Conectividad a internet")
    print("   - Instalación: pip install boto3 pyarrow s3fs")

print(f"\n✨ Proceso de exportación completado")
=== 💾 EXPORTACIÓN A PARQUET S3 ===

📤 Método 1: Pandas → S3
✅ Dataset exportado con pandas a S3
📁 Bucket: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_pandas.parquet
📦 Tamaño archivo: 0.48 MB
📅 Última modificación: 2025-08-29 05:26:05+00:00

📤 Método 2: Spark → S3
25/08/29 05:26:05 WARN TaskSetManager: Stage 65 contains a task of very large size (3558 KiB). The maximum recommended task size is 1000 KiB.
                                                                                
✅ Dataset también exportado con Spark a S3
📁 Ubicación Spark: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_spark.parquet

📤 Método 3: Usando s3fs
✅ Dataset exportado usando s3fs
📁 Ubicación: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_s3fs.parquet

📋 Archivos en s3://xideralaws-curso-edgargu/covid19-data/:
   📄 covid19-data-json/covid19_mexico_analysis.json (0.00 MB)
   📄 covid19-data/covid19_mexico_pandas.parquet (0.48 MB)
   📄 covid19-data/covid19_mexico_s3fs.parquet (0.48 MB)
   📄 covid19-data/covid19_mexico_spark.parquet/_SUCCESS (0.00 MB)
   📄 covid19-data/covid19_mexico_spark.parquet/part-00000-18404016-c329-42a9-9bff-66e918205935-c000.snappy.parquet (0.50 MB)

🎯 URLs de acceso:
   Pandas: s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_pandas.parquet
   Spark:  s3://xideralaws-curso-edgargu/covid19-data/covid19_mexico_spark.parquet

✨ Proceso de exportación completado

=============================================================================¶

CELDA 25: Limpieza de Recursos¶

=============================================================================¶

In [34]:
print("="*60)
print("LIMPIEZA DE RECURSOS")
print("="*60)

# Limpiar cache
df_clean.unpersist()
spark_df.unpersist()

# Cerrar Spark session
spark.stop()

# Limpiar variables de memoria
import gc
del df, df_clean, spark_df, ml_data, data_scaled
gc.collect()

print("✅ Spark session cerrada")
print("✅ Cache liberado")
print("✅ Variables eliminadas")
print("✅ Memoria liberada")

print("\n" + "="*80)
print("🎉 ANÁLISIS COMPLETADO EXITOSAMENTE")
print("="*80)
print("📊 Visualizaciones generadas: 15")
print("🔍 Análisis estadístico: Completo")
print("🤖 Machine Learning: K-means + PCA")
print("📦 Formato Parquet: Generado")
print("☁️ AWS S3: Configurado")
print("="*80)
============================================================
LIMPIEZA DE RECURSOS
============================================================
✅ Spark session cerrada
✅ Cache liberado
✅ Variables eliminadas
✅ Memoria liberada

================================================================================
🎉 ANÁLISIS COMPLETADO EXITOSAMENTE
================================================================================
📊 Visualizaciones generadas: 15
🔍 Análisis estadístico: Completo
🤖 Machine Learning: K-means + PCA
📦 Formato Parquet: Generado
☁️ AWS S3: Configurado
================================================================================

<<===========================================================>>¶

Documentación del Proyecto Covid 19¶

Resumen de la solución implementada en el proyecto¶

The project consists of a full-scale implementation for COVID-19 data analysis in Mexico leveraging advanced Big Data, Machine Learning, and cloud architecture technologies. The original dataset with over 100,000 records is stored in CSV and Parquet formats in an AWS S3 bucket.

Apache Spark is used for efficient ingestion, cleaning, transformation, and statistical analysis of the data, including creation of relevant categories (such as age groups), advanced correlation and clustering analysis (K-means), as well as dimensionality reduction (PCA) to identify significant patient patterns.

Concurrently, an AWS Lambda function was developed to automate processing and statistical calculations, facilitating access and presentation through an interactive dashboard built with Streamlit. This dashboard dynamically reads processed data from S3 and displays key metrics, demographic, temporal analyses.

Mechanisms were implemented to export processed data in Parquet format back to S3, ensuring a continuous and reproducible data flow.

As a future plan, migrating the data to scalable databases such as MySQL or DynamoDB is considered to improve data management, efficient querying, and integration with broader analytical and operational ecosystems.

This solution thus integrates multiple technology layers—from large-scale ingestion and processing with Spark, cloud automation with Lambda, interactive visualization with Streamlit, to planned data storage architectures—creating a robust, agile, and scalable epidemiological analytical system.

Diagrama de Arquitectura del Pipeline¶

image.png

image.png

IAM con Roles y Políticas¶

image.png

image.png

image.png

GitHub Repositorio¶

image.png

Archivos creados en S3 xideralaws-curso-edgargu¶

image.png

Archivos .parquet de carpeta "covid19-data/"¶

image.png

Spark Jobs¶

image.png

image.png

image.png

image.png

image.png

image.png

Código en Lambda de AWS Console¶

image.png

image.png

Output¶

image.png

Y también código generado para obtener la respuesta en Json y mandar esa respuesta dentro de mi bucket¶

image.png

image.png

image.png

Output:¶

image.png

Código Python para Streamlit¶

image.png

image.png

image.png

image.png

In [ ]:
 

Dashboard Streamlit¶

Link: http://localhost:8502/¶

image.png

image.png

image.png

image.png

image.png

image.png

image.png

Intento de presentación por medio de Visual Studio Code en GitHub para Streamlit¶

image.png

In [ ]:
 

Certificados AWS¶

GitHub¶

image.png

image.png

image.png

image.png

image.png